-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: use spawned tasks to reduce call stack depth and avoid busy waiting #16319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
8769ce4
to
2a965dc
Compare
break; | ||
// Spawn a task the first time the stream is polled for the sort phase. | ||
// This ensures the consumer of the sort does not poll unnecessarily | ||
// while the sort is ongoing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may not fully understand this change, but I believe it will start generating input as soon as execute
is called, rather than waiting for the first poll...
Another potential problem is that it will disconnect the production of batches from the consumption of them -- in other words by design I think it will potentially produce batches on a different thread than consumes them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally tried to avoid that and I'm fairly sure it does, but I'll try to come up with a test that demonstrates it since it's a very important detail.
These kinds of constructs have an Inceptiony quality to them, don't they. If I got my head wrapped around Futures and async correctly they're essentially interchangeable and inert until first polled.
What should be happening here is that you get stream::once
creates a stream consisting of a single element which is to be produced by a future. The future in question is only polled the first time the stream is polled.
That future is an async block which in its first poll spawns the task. In other words, the spawn is deferred until first poll. Then we await the spawned task, which is just polling the JoinHandle.
Anyway, I don't want you to take my word for it. I'll get to work on a test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have gotten this completely wrong, but that should not be the case. You're correct that preparing the stream is potentially on a different thread, but what gets returned by the task is the stream itself, not the individual record batches. The produced stream should still be getting drained by the original task. What you're describing is what RecordBatchReceiverStream
does which is quite different and requires channels for the inter-thread communication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a test case that attempts to demonstrate that processing is deferred. If this looks ok to you I can add the same thing for the other touched code as well.
I'm not sure how I can demonstrate the absence of multi-threading in a test case.
Wrt comprehensibility, I have to admit I am still very much in the learning-as-I-go phase of using the futures crate. There might be a more elegant or straightforward way to express this construct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pepijnve -- this is an interesting idea, but I have a concern. I'll also fire off some benchmarks to see if it has a measurable impact
🤖 |
🤖: Benchmark completed Details
|
@alamb I've been trying to make sense of what to do with the benchmark results. They always seem to give me very mixed results when I run them locally (that's part of why I did the min/max/stddev thing, to try to get more insight). Some tests are slower but total and average time increase is marginal. Should I take a closer look at the 1.13 slower one? clickbench_extended seems to have a consistent penalty. I'll try to understand why that is. |
I had a look at
|
Looking at the Query 13 1.13x slower
Query 28: 1.10x faster
|
7828d0e
to
0e4fbdb
Compare
In what situations would these changes lead to better performance? |
(Or is it just benchmark noise?) |
The jury is still out on whether it makes sense or not. I can explain my theoretical reasoning. Apologies up front if I'm writing too pedantically. Just trying to explain things as clearly as I can. Not a database guy so this may sounds hopelessly naive. The first observation is that yielding all the way to the runtime in Tokio requires stack unwinding. That's the nature of stackless tasks. The deeper your call stack is, the more call frames you need to unwind and the more calls you'll need to redo to get back to the yield point. I've been trying to find information on whether the Rust compiler does some magic to avoid this, but as far as I can tell that's not the case. I did find hints that it optimizes nested async function calls, but it will not do so for nested dyn Stream poll_next calls. Makes sense; an aot compiler will typically not be able to optimize across virtual function calls. Second observation is that DataFusion's volcano model naturally leads to fairly deep call stacks. The tree of execution plans results in a tree of streams and a parent stream's poll_next will often directly call poll_next on a child. If you get one of these deep call stacks, yielding from the deepest point potentially means unwinding the whole thing and coming back. This is mitigated a bit already when volcano breaking operators like repartition are present in the plan. The deepest call stacks are seen when running with Third, pipeline breaking operators are intrinsically two-phase. First they collect, then they emit. There's a gray area of course, but I'm talking about the classic ones like single aggregation. While a pipeline breaking stream is in its collect phase, it can be 100% sure that it will not have any new data for poll_next caller until that phase completes. There's really not much point in telling the caller Combing all that I think you're looking for deep query plans with nested pipeline breakers. In a different PR someone pointed me to this query
The nested sorts in the physical plan are something of a worst case scenario. At the deepest sort you have a 12 level deep call stack that gets reactivated for every yield. If instead we chop this into 6 chained spawned task, you get 6 much shallower call stacks. Of those tasks only one will be active, the other ones will be inert until there's actually something to do. A second factor can be the data source. Filesystem streams tend to be always ready, others may not. The more the source returns pending the more you'll see the overhead described above show up I think. All of this assumes of course that going up and back down the call stack has a non trivial cost. Perhaps it's not significant enough to be measurable. I'm still figuring out how to best profile this stuff, so I'm afraid I don't have anything more fact based to give you yet. Besides performance there's a small aesthetic aspect to this. I find that a stream that responds with |
I think I'll put this one in draft for now. Benchmarks results say "needs more work and performance analysis" to me.
|
Googling a bit I'm starting to get the impression that I shouldn't be thinking Tokio tasks are as lightweight as coroutines in some other ecosystems. |
#16357 might be relevant here. I was testing on an old system with spinning disks. Going to retest with more iterations and this change to make sure I'm not measuring noise. |
@alamb @Dandandan I started a benchmark run with 50 iterations and the TPCH benchmark change that eliminates local filesystem access yesterday evening. Checked the results this morning... 😌 Would be great if someone else could confirm. If you would like to reproduce this was Long story short, in my environment at least, the 5 iterations setting the bench.sh script uses is just too little. Especially in the first 5-10 iterations I see way too much variability in the runs for it to be useful. It stabilizes later on. I wonder if it would be a good idea to modify the benchmark code to always do a number of warmup iterations before we actually start measuring.
|
I did a second identical run because those result seemed just too good to be true to me. This is much closer to what I was expecting: more or less status quo. That does mean I'm back to getting wildly differing results which I can't really explain. This is on an old repurposed PowerEdge r730 running in a Ubuntu 24.04 VM on esxi. It's the only VM on the machine so can't be noisy neighbors. Maybe the hardware is just starting to get flaky. Fastest time comparison
Average time comparison
|
🤦♂️ that's not very useful now is it. I need a better machine to test on min/avg/max
|
c3c7918
to
b708c11
Compare
Which issue does this PR close?
Rationale for this change
Yielding to the runtime in Tokio involves unwinding the call stack. When a query contains many nested pipeline blockers when it yields it's likely to do so from quite deep. PRs #16196 and/or #16301 increase the frequency of this.
Luckily Tokio provides just the tool to solve this: spawned tasks. By moving the blocking portion of operators to a spawned task, the call stack depth is significantly reduced. Additionally the caller no longer needs to poll the blocking task in a busy loop since it will only get woken when the spawned task completes which is the signal that the stream is ready to start emitting data.
What this change effectively does is chop a chain of
n
dependent pipeline blockers inton
sub tasks. Only one of these subtasks will actually be scheduled by the runtime at any given time. The others will simply wait until their direct dependent is ready to emit data.Possible alternative
It could be interesting to generalize the pattern of a pipeline blocking operator having a blocking prepare phase, followed by a streaming emit phase. I think this would probably have to take the shape of support types for operator implementations since this is rather tightly coupled to the internal implementation of an operator. I did not attempt to design this kind of support code yet. I don't having general support code is a strict prerequisite for this PR through. That's something that can be done in a later refactor since the current change is 100% implementation details.
What changes are included in this PR?
Wrap the blocking portion of sort and join (build phase) in a spawned task.
The spawned tasks are kicked off the first time the stream is polled, not when execute is called. The guideline on this is not yet clear to me, but this is related to #16312.
The aggregation operators have not been modified in this PR yet, but those could benefit from the same change. If this direction is deemed promising I will update those as well.
Are these changes tested?
No new tests added, covered by existing tests.
❌ The WASM tests fail due to those not yet running in a tokio context. Looking for feedback on whether that's a showstopper or not and how to fix that.
Are there any user-facing changes?
No, the modified operators yield more efficiently but nothing else changes.